// Copyright (c) 2020-present, INSPUR Co, Ltd. All rights reserved.
// This source code is licensed under Apache 2.0 License.
//
// Created by florian on 05.08.15.

#pragma once
#include "pure_mem/epoche.h"
#include "rocksdb/slice.h"
#include <atomic>
#include <stdint.h>
#include <string.h>

namespace syn_art_fullkey {

enum class NTypes : uint8_t {
  NodeType4 = 0,
  NodeType16 = 1,
  NodeType48 = 2,
  NodeType256 = 3
};

struct Prefix {
  uint16_t length_ = 0;
  char prefix_[0]; // 可变长结构体，保证数据存储在相邻空间中
};

// N class is abstract class for all node in ART tree.
// and support general function like insert \ nodeType \ lock etc.
class N {
protected:
  // 2b type 60b version 1b lock 1b obsolete
  std::atomic<uint64_t> typeVersionLockObsolete_{0b100};
  // version 1, unlocked, not obsolete
  uint16_t level_ = 0;
  uint16_t count_ = 0;
  std::atomic<void *> noNextKey_leaf_{nullptr};

  N(const N &) = delete;
  N(N &&) = delete;

public:
  void initital(NTypes type, uint32_t level) {
    typeVersionLockObsolete_.store(0b100);
    noNextKey_leaf_.store(nullptr);
    level_ = level;
    count_ = 0;
    typeVersionLockObsolete_.fetch_add((static_cast<uint64_t>(type) << 62));
  }
  inline NTypes getType() const {
    return static_cast<NTypes>(
        typeVersionLockObsolete_.load(std::memory_order_relaxed) >> 62);
  }
  inline uint16_t getLevel() const { return level_; }
  inline uint16_t getCount() const { return count_; }
  inline uint64_t getVersion() const { return typeVersionLockObsolete_.load(); }
  inline bool setFullKeyLeaf(void *oldvalue, void *value) {
    return noNextKey_leaf_.compare_exchange_strong(oldvalue, value);
  }
  inline void *getFullKeyLeaf() { return noNextKey_leaf_.load(); }

  static inline bool isLeaf(const N *n) {
    return (reinterpret_cast<uint64_t>(n) & (static_cast<uint64_t>(1) << 63)) ==
           (static_cast<uint64_t>(1) << 63);
  }

  static inline N *setLeaf(void *tid) {
    return reinterpret_cast<N *>((uint64_t)tid |
                                 (static_cast<uint64_t>(1) << 63));
  }

  static inline void *getLeaf(const N *n) {
    return (void *)((uint64_t)n & ((static_cast<uint64_t>(1) << 63) - 1));
  }

  inline bool isObsolete() { return (getVersion() & 1) == 1; }
  inline bool isLocked() { return ((getVersion() & 0b10) == 0b10); }

  inline void writeUnlock() { typeVersionLockObsolete_.fetch_add(0b10); }
  // can only be called when node is locked
  inline void writeUnlockObsolete() {
    typeVersionLockObsolete_.fetch_add(0b11);
  }

  inline bool readUnlockOrRestart(uint64_t startRead) const {
    return startRead == typeVersionLockObsolete_.load();
  }
  inline bool checkOrRestart(uint64_t startRead) const {
    return readUnlockOrRestart(startRead);
  }

  void writeLockOrRestart(bool &needRestart);

  inline void lockVersionOrRestart(bool &needRestart) {
    while (true) {
      uint64_t version = getVersion();
      if (isLocked() || isObsolete()) {
        needRestart = true;
        return;
      }
      if (typeVersionLockObsolete_.compare_exchange_strong(version,
                                                           version + 0b10)) {
        return;
      }
    }
  }
  static void insertAndUnlock(N *node, N *parentNode, uint8_t keyParent,
                              uint8_t key, N *val, bool &needRestart);

  static void removeAndUnlock(N *node, uint8_t key, N *parentNode,
                              uint8_t keyParent, bool &needRestart);

  static N *getChild(const uint8_t k, N *node);
  static void change(N *node, uint8_t key, N *val);

  static void insertGrow(N *&n, N *parentNode, uint8_t keyParent,
                                  uint8_t key, N *val, bool &needRestart);

  static void getChildrenSmall(const N *node, uint8_t start, uint8_t end,
                               std::tuple<uint8_t, N *> children1[],
                               uint32_t &childrenCount, u_int32_t childMax);
  static void getChildrenLarge(const N *node, uint8_t start, uint8_t end,
                               std::tuple<uint8_t, N *> children1[],
                               uint32_t &childrenCount, u_int32_t childMax);
  static N *copyNodeWithNewPrefix(N *node, const rocksdb::Slice &prefix);
  static N *nodePrefixChange(N *node, const rocksdb::Slice &prefix);
  static Prefix &getPrefix(N *node);
  static bool insert(N *node, uint8_t key, N *val);
  static N *newTreeNode(NTypes type, uint16_t level,
                        const rocksdb::Slice &prefix);
  static void copyTo(N *srcNode, N *destNode);
  static void removeAll(N *node);
};

// N4 class is node type in ART tree, store four children at most.
class N4 : public N {
private:
  std::atomic<uint8_t> keys_[4];
  std::atomic<N *> children_[4];
  Prefix prefix_;

public:
  void init(uint32_t level, const rocksdb::Slice &prefix) {
    initital(NTypes::NodeType4, level);
    prefix_.length_ = prefix.size();
    memcpy(prefix_.prefix_, prefix.data_, prefix.size());
    memset(keys_, 0, sizeof(keys_));
    memset(children_, 0, sizeof(children_));
  }

  bool insert(uint8_t key, N *n);

  void copyTo(N *n) const;

  void change(uint8_t key, N *val);

  N *getChild(const uint8_t k) const;

  void getChildrenSmall(uint8_t start, uint8_t end,
                        std::tuple<uint8_t, N *> *&childrenList,
                        uint32_t &childrenCount, u_int32_t childMax) const;
  void getChildrenLarge(uint8_t start, uint8_t end,
                        std::tuple<uint8_t, N *> *&childrenList,
                        uint32_t &childrenCount, u_int32_t childMax) const;
  Prefix &getPrefix() { return prefix_; }
};

// N16 class is node type in ART tree, store sixteen children at most.
class N16 : public N {
public:
  std::atomic<uint8_t> keys_[16];
  std::atomic<N *> children_[16];
  Prefix prefix_;

  static uint8_t flipSign(uint8_t keyByte) {
    // Flip the sign bit, enables signed SSE comparison of unsigned values, used
    // by Node16
    return keyByte ^ 128;
  }

  static inline unsigned ctz(uint16_t x) {
    // Count trailing zeros, only defined for x>0
#ifdef __GNUC__
    return __builtin_ctz(x);
#else
    // Adapted from Hacker's Delight
    unsigned n = 1;
    if ((x & 0xFF) == 0) {
      n += 8;
      x = x >> 8;
    }
    if ((x & 0x0F) == 0) {
      n += 4;
      x = x >> 4;
    }
    if ((x & 0x03) == 0) {
      n += 2;
      x = x >> 2;
    }
    return n - (x & 1);
#endif
  }

  std::atomic<N *> *getChildPos(const uint8_t k);

public:
  void init(uint32_t level, const rocksdb::Slice &prefix) {
    initital(NTypes::NodeType16, level);
    prefix_.length_ = prefix.size();
    memcpy(prefix_.prefix_, prefix.data_, prefix.size());
    memset(keys_, 0, sizeof(keys_));
    memset(children_, 0, sizeof(children_));
  }

  bool insert(uint8_t key, N *n);

  void copyTo(N *n) const;

  void change(uint8_t key, N *val);

  N *getChild(const uint8_t k) const;

  void getChildrenSmall(uint8_t start, uint8_t end,
                        std::tuple<uint8_t, N *> *&childrenList,
                        uint32_t &childrenCount, u_int32_t childMax) const;
  void getChildrenLarge(uint8_t start, uint8_t end,
                        std::tuple<uint8_t, N *> *&childrenList,
                        uint32_t &childrenCount, u_int32_t childMax) const;

  Prefix &getPrefix() { return prefix_; }
};

// N48 class is node type in ART tree, store 48 children at most.
class N48 : public N {
  std::atomic<uint8_t> childIndex_[256];
  std::atomic<N *> children_[48];
  Prefix prefix_;

public:
  static const uint8_t emptyMarker = 48;

  void init(uint32_t level, const rocksdb::Slice &prefix) {
    initital(NTypes::NodeType48, level);
    prefix_.length_ = prefix.size();
    memcpy(prefix_.prefix_, prefix.data_, prefix.size());
    memset(childIndex_, emptyMarker, sizeof(childIndex_));
    memset(children_, 0, sizeof(children_));
  }

  bool insert(uint8_t key, N *n);

  void copyTo(N *n) const;

  void change(uint8_t key, N *val);

  N *getChild(const uint8_t k) const;

  void getChildrenSmall(uint8_t start, uint8_t end,
                        std::tuple<uint8_t, N *> *&childrenList,
                        uint32_t &childrenCount, u_int32_t childMax) const;
  void getChildrenLarge(uint8_t start, uint8_t end,
                        std::tuple<uint8_t, N *> *&childrenList,
                        uint32_t &childrenCount, u_int32_t childMax) const;
  Prefix &getPrefix() { return prefix_; }
};

// N256 class is node type in ART tree, can store 256 children.
class N256 : public N {
  std::atomic<N *> children_[256];
  Prefix prefix_;

public:
  void init(uint32_t level, const rocksdb::Slice &prefix) {
    initital(NTypes::NodeType256, level);
    prefix_.length_ = prefix.size();
    memcpy(prefix_.prefix_, prefix.data_, prefix.size());
    memset(children_, '\0', sizeof(children_));
  }

  bool insert(uint8_t key, N *val);

  void copyTo(N *n) const;

  void change(uint8_t key, N *n);

  N *getChild(const uint8_t k) const;

  void getChildrenSmall(uint8_t start, uint8_t end,
                        std::tuple<uint8_t, N *> *&childrenList,
                        uint32_t &childrenCount, u_int32_t childMax) const;
  void getChildrenLarge(uint8_t start, uint8_t end,
                        std::tuple<uint8_t, N *> *&childrenList,
                        uint32_t &childrenCount, u_int32_t childMax) const;
  Prefix &getPrefix() { return prefix_; }
};


} // namespace syn_art_fullkey
